Source code for hysop.parameters.tensor_parameter

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import numpy as np
import sympy as sm

from hysop.constants import HYSOP_REAL
from hysop.tools.htypes import check_instance, first_not_None
from hysop.tools.numerics import is_signed, is_unsigned, is_fp, is_complex
from hysop.tools.sympy_utils import subscripts, SymbolicBase
from hysop.parameters.parameter import Parameter


[docs] class TensorParameter(Parameter): """ A tensor parameter is np.ndarray of a given dtype and shape that may change value as simulation advances. """
[docs] def __new__( cls, name, shape, dtype=HYSOP_REAL, pretty_name=None, initial_value=None, min_value=None, max_value=None, ignore_nans=False, **kwds, ): """ Create or get an existing TensorParameter with a specific name shape and dtype. Parameters ---------- name: string A name for the parameter that uniquely identifies it. pretty_name: string A pretty name for the parameter. shape: array like of ints Shape of this TensorParameter. dtype: type convertible to np.dtype, optional Underlying dtype of this TensorParameter. Defaults to HYSOP_REAL. initial_value: optional, defaults to an uninitialized array. Initial value as a scalar or np.ndarray. If not initial value is given, all values initialized to 0. min_value: scalar, optional, defaults to None Minimum value allowed for this parameter. max_value: scalar, optional, defaults to None Maximum value allowed for this parameter. ignore_nans: bool Set this to True to allow NaN values. kwds: dict Base class arguments. Attributes ---------- shape: array like of ints Shape of this TensorParameter. dtype: type convertible to np.dtype Underlying dtype of this TensorParameter. min_value: scalar, optional, defaults to None Minimum value allowed for this TensorParameter. max_value: scalar, optional, defaults to None Maximum value allowed for this TensorParameter. ignore_nans: bool True if this TensorParameter can have NaN values. """ if ("allow_None" in kwds) and (kwds["allow_None"] is True): msg = "A TensorParameter cannot be allowed to be set to None." raise ValueError(msg) check_instance(name, (str, SymbolicBase)) if isinstance(name, SymbolicBase): symbol = name name = symbol._name pretty_name = symbol._pretty_name else: symbol = None pretty_name = first_not_None(pretty_name, name) check_instance(name, str) check_instance(pretty_name, str) check_instance(shape, (list, tuple), values=(int, np.integer), allow_none=True) check_instance(ignore_nans, bool) assert (min_value is None) or (max_value is None) or (min_value <= max_value) parameter_types = (np.ndarray,) if is_signed(dtype): parameter_types += (np.int8, np.int16, np.int32, np.int64, int) elif is_unsigned(dtype): parameter_types += (np.uint8, np.uint16, np.uint32, np.uint64) elif is_fp(dtype): parameter_types += ( np.float16, np.float32, np.float64, np.longdouble, float, ) elif is_complex(dtype): parameter_types += (np.complex64, np.complex128, np.clongdouble, complex) initial_value = cls._compute_initial_value( shape, dtype, initial_value, min_value, max_value, ignore_nans ) obj = super().__new__( cls, name=name, pretty_name=pretty_name, parameter_types=parameter_types, allow_None=False, initial_value=initial_value, **kwds, ) obj._min_value = min_value obj._max_value = max_value obj._ignore_nans = ignore_nans from hysop.symbolic.parameter import ( SymbolicTensorParameter, SymbolicScalarParameter, ) if obj.__class__ is TensorParameter: if symbol: check_instance(symbol, SymbolicTensorParameter) obj._symbol = symbol else: obj._symbol = SymbolicTensorParameter(parameter=obj) else: if symbol: check_instance(symbol, sm.Symbol) obj._symbol = symbol else: obj._symbol = SymbolicScalarParameter(parameter=obj) return obj
def __init__( self, name, shape, dtype=HYSOP_REAL, pretty_name=None, initial_value=None, min_value=None, max_value=None, ignore_nans=False, **kwds, ): super().__init__( name=name, pretty_name=pretty_name, parameter_types=None, allow_None=False, initial_value=initial_value, **kwds, ) @classmethod def _compute_initial_value( cls, shape, dtype, initial_value, min_value=None, max_value=None, ignore_nans=None, ): if not isinstance(dtype, np.dtype): dtype = np.dtype(dtype) if isinstance(initial_value, (tuple, list)): initial_value = np.asarray(initial_value, dtype=dtype) if isinstance(initial_value, np.ndarray): check_instance(initial_value, np.ndarray) assert initial_value.dtype == dtype, (initial_value.dtype, dtype) assert initial_value.shape == shape, (initial_value.shape, shape) cls.__check_values( a=initial_value, dtype=dtype, min_value=min_value, max_value=max_value, ignore_nans=ignore_nans, ) elif np.isscalar(initial_value): assert shape is not None assert (min_value is None) or (initial_value >= min_value) assert (max_value is None) or (initial_value <= max_value) initial_value = np.full(shape=shape, dtype=dtype, fill_value=initial_value) elif initial_value is None: if shape is None: initial_value = None else: initial_value = np.zeros(shape=shape, dtype=dtype) else: msg = "Unknown initial_value type {}" msg = msg.format(type(initial_value)) raise TypeError(msg) return initial_value
[docs] def reallocate_tensor(self, shape, dtype, initial_value=None): self._value = self._compute_initial_value( shape, dtype, initial_value, min_value=self.min_value, max_value=self.max_value, ignore_nans=self.ignore_nans, ) self._symbol = self._update_symbol()
def _update_symbol(self): raise NotImplementedError
[docs] def view(self, idx, name=None, pretty_name=None, **kwds): """Take a view on a scalar contained in the Parameter.""" assert self._value is not None initial_value = self._value[tuple(slice(k, k + 1) for k in idx)] _name = self.name + "_" + "_".join(str(i) for i in idx) _pretty_name = self.pretty_name + subscripts(ids=idx, sep="") name = first_not_None(name, _name) pretty_name = first_not_None(pretty_name, _pretty_name) if initial_value.size == 1: from hysop.parameters.scalar_parameter import ScalarParameter return ScalarParameter( name=name, pretty_name=pretty_name, initial_value=initial_value.ravel(), dtype=self.dtype, min_value=self.min_value, max_value=self.max_value, ignore_nans=self.ignore_nans, const=self.const, quiet=self.quiet, is_view=True, **kwds, ) else: return TensorParameter( name=name, pretty_name=pretty_name, initial_value=initial_value, dtype=self.dtype, shape=initial_value.shape, min_value=self.min_value, max_value=self.max_value, ignore_nans=self.ignore_nans, const=self.const, quiet=self.quiet, is_view=True, **kwds, )
[docs] def iterviews(self): """Iterate over all parameters views to yield scalarparameters.""" for idx in np.ndindex(self.shape): yield (idx, self.view(idx))
@classmethod def __check_values(cls, a, min_value, max_value, ignore_nans, dtype): if np.isscalar(a): a = np.asarray([a], dtype=dtype) assert (a is not None) and isinstance(a, np.ndarray) if ignore_nans: if min_value is not None: assert np.all(np.nanmin(a) >= min_value), "min value constraint failed." if max_value is not None: assert np.all(np.nanmax(a) <= max_value), "max value constraint failed." else: if np.any(np.isnan(a)): msg = "Given value contains NaNs:\n{}\n" msg = msg.format(a) raise ValueError(msg) if min_value is not None: assert np.all(np.min(a) >= min_value), "min value constraint failed." if max_value is not None: assert np.all(np.max(a) <= max_value), "max value constraint failed."
[docs] def check_values(self, a): return self.__check_values( a, dtype=self.dtype, min_value=self.min_value, max_value=self.max_value, ignore_nans=self.ignore_nans, )
def _get_shape(self): """Get parameter shape.""" return self._value.shape if (self._value is not None) else None def _get_size(self): """Get parameter size.""" return self._value.size if (self._value is not None) else 0 def _get_dtype(self): """Get parameter dtype.""" assert self._value is not None return self._value.dtype if (self._value is not None) else None def _get_ctype(self): """Get the data type of the discrete field as a C type.""" from hysop.backend.device.codegen.base.variables import dtype_to_ctype dtype = self.dtype return dtype_to_ctype(dtype) def _get_min_value(self): """Return minimum value allowed for array values.""" return self._min_value def _get_max_value(self): """Return maximum value allowed for array values.""" return self._max_value def _get_ignore_nans(self): """Return True if array can have NaNs.""" return self._ignore_nans def _get_value_impl(self): """Return a read-only reference on the underlying data buffer.""" assert self._value is not None view = self._value.view() view.flags.writeable = False return view.view() def _get_tensor_value(self): """ Get a read-only view on this array parameter but always as a numpy array, even for ScalarParameter parameters. """ return self._value.copy() def _set_value_impl(self, value): """Given value will be copied into internal buffer.""" assert self._value is not None assert (value is not None) and isinstance(value, np.ndarray) if value.shape != self.shape: msg = "Parameter shape mismatch, expected {} but got {}." msg = msg.format(self.shape, value.shape) raise ValueError(msg) if value.dtype != self.dtype: msg = "Parameter dtype mismatch, expected {} but got {}." msg = msg.format(self.dtype, value.dtype) raise ValueError(msg) if ( (self.min_value is not None) or (self.max_value is not None) or (self.ignore_nans is False) ): self.check_values(value) self._value[...] = value
[docs] def __getitem__(self, slices): """Get a read-only subview on this array parameter.""" assert self._value is not None view = self._value.__getitem__(slices) if np.isscalar(view): return view view.flags.writeable = False return view.view()
[docs] def __setitem__(self, slices, value): """Set a subview on this array parameter to a certain value (value can be broadcasted).""" assert self._value is not None self.check_values(value) self._value.__setitem__(slices, value)
[docs] def long_description(self): ss = """\ TensorParameter[name={}, pname={}] *shape: {} *dtype: {} *min_value: {} *max_value: {} *ignore_nans: {} *value:\n{} """.format( self.name, self.pretty_name, self.shape, self.dtype, self.min_value, self.max_value, self.ignore_nans, self.value, ) return ss
[docs] def short_description(self): attrs = ("name", "pretty_name", "shape", "dtype", "min_value", "max_value") info = [] for attr in attrs: val = getattr(self, attr) if val is not None: info.append(f"{attr}={val}") attrs = ", ".join(info) ss = "TensorParameter[{}]" ss = ss.format(attrs) return ss
shape = property(_get_shape) size = property(_get_size) dtype = property(_get_dtype) ctype = property(_get_ctype) min_value = property(_get_min_value) max_value = property(_get_max_value) ignore_nans = property(_get_ignore_nans) tensor_value = property(_get_tensor_value)